package com.caissa.springboot.starter.kafka.consumer.errorhandle;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.ErrorHandler;

/**
 * Description:
 * User: wangwei
 * Date: 2021-06-18 9:43 上午
 */
@Slf4j
public class ErrMsg2EsHandler implements ErrorHandler {

    private String name;

    public ErrMsg2EsHandler(String name) {
        this.name = name;
    }

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {

        /**
         * TODO
         * 1.写入es，这个操作跟消息消费是同步的，可以加上熔断, topic msg excep time
         * 2.要提供错误消息的查询接口
         *
         */
        log.error("消息处理失败 {}，将消息 {} 转存至es ", thrownException.getMessage(), data);
    }

    /**
     * 自动ack
     * @return
     */
    @Override
    public boolean isAckAfterHandle() {
        return true;
    }
}
